package com.chukun.flink.stream.action.click;

import com.alibaba.fastjson.JSON;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;

import java.io.IOException;
import java.nio.charset.StandardCharsets;

/**
 * @author chukun
 * @version 1.0.0
 * @description 用户行为数据序列化，反序列化
 * @createTime 2022年05月21日 22:22:00
 */
public class UserBehaviorEventSchema implements DeserializationSchema<UserBehaviorEvent>, SerializationSchema<UserBehaviorEvent> {
    @Override
    public UserBehaviorEvent deserialize(byte[] message) throws IOException {
        if (message == null) {
            return null;
        }
        return JSON.parseObject(message, UserBehaviorEvent.class);
    }

    @Override
    public boolean isEndOfStream(UserBehaviorEvent userBehaviorEvent) {
        return false;
    }

    @Override
    public TypeInformation<UserBehaviorEvent> getProducedType() {
        return TypeInformation.of(UserBehaviorEvent.class);
    }

    @Override
    public byte[] serialize(UserBehaviorEvent userBehaviorEvent) {
        if (userBehaviorEvent == null) {
            return new byte[0];
        }
        return userBehaviorEvent.toString().getBytes(StandardCharsets.UTF_8);
    }
}
